package day04;

import beans.SensorReading;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 容错机制
 *
 * @author lvbingbing
 * @date 2022-01-12 21:44
 */
public class FaultTolerance {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、设置并行度
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 3、学习容错机制相关 API
        studyFaultTolerance(env);
        DataStream<String> inputStream = env.socketTextStream("hadoop102", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        dataStream.print();
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 容错机制相关 API
     *
     * @param env <br>
     */
    private static void studyFaultTolerance(StreamExecutionEnvironment env) {
        // 1、检查点配置
        env.enableCheckpointing(300);
        // 高级选项
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setCheckpointTimeout(60000L);
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        checkpointConfig.setMinPauseBetweenCheckpoints(100L);
        checkpointConfig.setPreferCheckpointForRecovery(true);
        checkpointConfig.setTolerableCheckpointFailureNumber(0);
        // 2、重启策略配置
        // 固定延迟重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
        // 失败率重启
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
    }
}
